Summary

This vignette serves as protocol for the processing of event-based rainfall data. The steps below cover imbibing, standardisation, and processing—corresponding code to realise these steps are below. Data processing is done in R ‘ipayipi’. The data were collected using Hobo-data loggers installed in Texas tipping-bucket rain gauges.

  1. Load R ipayipi (install from GitHub),
  2. Initiate pipeline,
  3. Imbibe data,
  4. Standardise data,
  5. Make station files,
  6. Identify gaps, &
  7. Process data.

The overview below shows code needed to execute the steps above. Each of these steps are further described in the following sections of this vignette. For more details and options for each step, see respective function documentation/help files in R. The help files serve as documentation protocol for each function used in ‘ipayipi’.

## 1. load pacakge
library(ipayipi)

## 2. initiate pipeline ----
# define the general working directory: wherever data is going to be processed ...
wd <- "ipayipi_vignettes/rainfall_eg"

# setup pipeline directory structure
pipe_house <- ipip_house(work_dir = wd)

## 3. read & imbibe in data ----
logger_data_import_batch(pipe_house)
imbibe_raw_batch(pipe_house, data_setup = ipayipi::hobo_data)

## 4. standardise data ----
header_sts(pipe_house)
phenomena_sts(pipe_house)

## store standardised data ----
# transfer standardised data to the nomvet_room
transfer_sts_files(pipe_house)

## 5. append standardised data files ----
append_station_batch(pipe_house)

## 6. summarise gaps ----
gap_eval_batch(pipe_house)

## 7. Process data ----
dt_process_batch(pipe_house = pipe_house, pipe_seq = pipe_seq)

Introduction

This vignette serves as a protocol for processing rainfall data, largely from Texas tipping-bucket rain gauges using R ipayipi. ipayipi facilitates a dynamic and structured pipeline so that processing down a pipeline (and up again), is traceable.

Texas tipping-bucket rain gauges with hobo-data loggers record rainfall events in a discontinuous or ‘event based’ manner, that is, the tipping bucket of known volume ‘tips’ each time it fills with water from the gauge. Each tip gets tallied as an event so that cumulative rainfall can be recorded. Note that this ‘discontinuous’ event recording contrasts the ‘continuous’ record-interval type associated with many default loggers settings that record a sample at a specific time and set interval, or average of a phenomenon over a set record interval. The ‘discontinuous’ nature of event-based time-series data has important implications for how the data.

Initiate pipeline: the ‘pipe_house’

The ‘pipe_house’ is the name of the directory structure wherein a data-pipeline is maintained. By initiating the pipeline, a number of directories will be created for ‘housing’ data at various stages of archival and processing, as well as storing scripts, and reports.

To start off, the relative directory where processing is going to done must be defined.

# setting up the 'pipe_house'
## define pipeline working directory 'wd' (must be reletive)

wd <- "ipayipi_vignettes/rainfall_eg"

## initiate pipeline
pipe_house <- ipip_house(work_dir = wd)
print(pipe_house)
## $r
##                                 r 
## "ipayipi_vignettes/rainfall_eg/r" 
## 
## $source_room
##                                 source_room 
## "ipayipi_vignettes/rainfall_eg/source_room" 
## 
## $wait_room
##                                 wait_room 
## "ipayipi_vignettes/rainfall_eg/wait_room" 
## 
## $nomvet_room
##                                 nomvet_room 
## "ipayipi_vignettes/rainfall_eg/nomvet_room" 
## 
## $ipip_room
##                                 ipip_room 
## "ipayipi_vignettes/rainfall_eg/ipip_room" 
## 
## $dta_out
##                                 dta_out 
## "ipayipi_vignettes/rainfall_eg/dta_out" 
## 
## $reports
##                                 reports 
## "ipayipi_vignettes/rainfall_eg/reports" 
## 
## $raw_room
##                                 raw_room 
## "ipayipi_vignettes/rainfall_eg/raw_room" 
## 
## $work_dir
## [1] "ipayipi_vignettes/rainfall_eg"

What has ipip_house() done? It has created the following directories, if they don’t already exist*:

  1. ‘r’: Folder for r scripts. Note the ‘pipe_seq’ folder contained therein—this will be used to store parameters for processing data later in the pipeline.
  2. ‘source_room’: where ‘new’/incoming logger data is going to be made available.
  3. ‘wait_room’: waiting room for imbibing & standarising data coming into the pipeline.
  4. ‘nomvet_room’: where standardised/corrected logger files get archived (nomenclature vetted).
  5. ‘ipip_room’: here data get appended into single station records, and processed.
  6. ‘dta_out’: Folder to which processed data can be exported in csv format.
  7. ‘reports’: Folder for general reports, e.g., references or markdown documents.
  8. ‘raw_room’: where ‘unaltered’ raw data gets pushed.

*NB! Running this function will not overwrite existing data.

Imbibe data

In this step, incoming data gets pulled from pipelines data source, that is, the ‘source room’ (pipe_house$source_room), into the ‘waiting room’ (pipe_house$wait_room). The example data contains multiple years of hobo-rainfall data exports from SAEON rainfall stations in northern Maputaland, South Africa.

# copy data from source to the wait_room
logger_data_import_batch(pipe_house = pipe_house)

Now that some data is in the ‘wait_room’ directory we can read it into R. Note the ‘data_setup’ option for Hobo-data logger file exports ipayipi::hobo_rain. Also the record_interval_type has been set to ‘event_based’; options include ‘mixed’ and ‘continuous’. Use ‘event_based’ for hobo data (including cases where it is ‘mixed’ to cater for changes in future outputs). More on this below.

imbibe_raw_batch(pipe_house = pipe_house,
  data_setup = ipayipi::hobo_rain, # standard for reading hobo file exports 
  record_interval_type = "event_based"
)

For additional insight into data-input formats, that is, the ‘data_setup’ argument, see the help files of the imbibe_raw_logger_dt() function (i.e., ?imbibe_raw_logger_dt). The ipayipi::ipayipi::hobo_rain caters for three variations in format associated with Hoboware exports. These three variations allow the import of hundreds of differently formatted (unencrypted) Hobo data file exports, and more variations can be added.

Record-interval type is an important parameter. ipayipi handles continuous, event-based (discontinuous), and mixed time-series data types. Record intervals get evaluated using the record_interval_eval() function. Record interval information will be important for further steps, such as, identifying ‘gaps’ or missing data automatically. More on this below.

Standardise data

Both file-header information, plus other phenomena (variable) metadata, will now be standardised. The spelling/synonyms of file names and associated header metadata have to be scrutinised first. Only after header information gets standardised can we move onto the phenomena.

# standardise header nomenclature
header_sts(pipe_house)

If it is the first-time running header_sts(), or new synonyms get introduced into pipe-house directory, header_sts() will produce a warning. This normally happens when a station name is adjusted a loggr program, or a new station is introduced into the pipeline, the pipeline then needs to define the new-nomenclature standards.

Unstandardised names (or columns) have the preffix ‘uz. These standards get stored in a file called ‘nomtab.rns’ in the ‘waiting room’. If this file is deleted—a new one will be generated—but the user will have to populate the tables with synonym vocab.

The nomenclature table in the ‘waiting room’ can be updated from the csv file (or directly in R). If a new synonym gets introduced—the file containing new nomenclature will be skipped in further processing—a ‘csv’ version of the ‘nomtab.rns’ will be copied to the ‘waiting room’ for editing.

Only the following fields — with NAs — require editing in the ‘nomtab’ ‘csv’:

Examine the header nomenclature for the example data below. Only the columns highlighted in blue must be edited.
uz_station location station stnd_title logger_type logger_title uz_record_interval_type uz_record_interval record_interval_type record_interval uz_table_name table_name
Plot Title: Coastal Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot Title: Mseleni TC mcp mabasa_tc_rn mcp_mabasa_tc_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Manz_Office mcp manz_office_rn mcp_manz_office_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot Title: Sileza mcp sileza_camp_rn mcp_sileza_camp_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot_Title_Coastal_cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot_Title_Manz_Office mcp manz_office_rn mcp_manz_office_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot_Title_Mseleni_TC mcp manz_office_rn mcp_manz_office_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot_Title_Sileza_Camp mcp sileza_camp_rn mcp_sileza_camp_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot_Title_Sileza_camp mcp sileza_camp_rn mcp_sileza_camp_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot_title_Manz_Office mcp manz_office_rn mcp_manz_office_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Plot_title_Mseleni_TC mcp mabasa_tc_rn mcp_mabasa_tc_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain
Sileza_Camp mcp sileza_camp_rn mcp_sileza_camp_rn hobo_pendant NA event_based discnt event_based discnt hobo_data raw_rain

Scrolling up and down the table above we see that we are dealing with four different rain gauges or ‘stations’. However, in the raw data there are multiple synonyms for each station’s standard title. The supplied ‘table name’, visible scrolling to the right, will be the name of the table for associated raw data in an ‘ipayipi’ station file; the standard title field will form the file name of the station. Standardising these fields as in the table above is important for further processing of the data.

Once NA values of the above fields have been populated the edited ‘csv’ will be imbibed into the pipeline structure when rerunning header_sts(pipe_house)—this function will imbibe the most recently updated ‘csv’ nomenclature table from the ‘wait_room’ into the pipeline, and standardised header nomenclature.

Tip: Double-check your nomenclature standards in the ‘csv’ file before running header_sts() again!

In step with good tidy data standards, keep nomenclature to ‘snake case’ with no special characters (bar the useful underscore’).

Standardising phenomena metadata follows a similar process as for header-data standardisation. If the phenomena standards have been described and there is a ‘phentab.rps’ in the ‘waiting room’, running the below code updates all files phenomena details.

# standardise phenomena
phenomena_sts(pipe_house = pipe_house)

Tip: Setting the remove_dups argument to TRUE in phenomena_sts() allows the user to interactively remove, through a prompt interface, duplicate phenomena, on the off change logger exports have included these. Any interactive (via the prompt line) functionality is only possibly if parallel processing is sequential. See the section on parallel processing at the end.

If there is no ‘phenomena table’ (‘phentab.rps’), one NA values in the ‘csv’ copy need to be described. The following fields in the ‘csv’ phentab must be populated:

Additional fields that are not mandatory include:

If an ‘f_convert’ factor (scroll right on the table below) is applied to phenomena, the standardised units must be different from the unstandardised units (uz_units) in the phenomena table. This ensures that phenomena that are appended have similar units.

phen_name_full phen_type phen_name units measure offset var_type uz_phen_name uz_units uz_measure f_convert sensor_id notes
Interference event: coupler attached Interference event attached event event NA fac Coupler Attached no_spec no_spec NA NA NA
Interference event: coupler attached Interference event attached event event NA fac Coupler Attached (LGR S/N: 20102559) no_spec no_spec NA NA NA
Interference event: bad battery Interference event bad_battery event event NA fac Bad Battery (LGR S/N: 20116793) no_spec no_spec NA NA NA
Interference event: host connected Interference event connected event event NA fac Host Connected no_spec no_spec NA NA NA
Interference event: host connected Interference event connected event event NA fac Host Connected (LGR S/N: 20102559) no_spec no_spec NA NA NA
Interference event: coupler detached Interference event detached event event NA fac Coupler Detached no_spec no_spec NA NA NA
Interference event: end of file Interference event end_of_file event event NA fac End Of File no_spec no_spec NA NA NA
Precipitation: total rainfall Precipitation rain_cumm mm cumm 0 num Event, mm (LGR S/N: 20112169, SEN S/N: 20112169) no_spec no_spec NA NA NA
Precipitation: total rainfall Precipitation rain_cumm mm cumm 0 num Rainfall (mm) #20102559 no_spec no_spec NA NA NA
Interference event: stopped Interference event stopped event event NA fac Stopped (LGR S/N: 20102559) no_spec no_spec NA NA NA

After filling in details, to replace NA values (only in columns highlighted blue!), rerun phenomena_sts(pipe_house), to imbibe the updated phenomena descriptions, and update the logger data.

Standardised data files get transferred to the ‘nomenclature vetted’ directory (‘nomtab room’) using the function below. After being transferred, files in the waiting room (except the nomtab and phentab standards) are automatically removed.

Note in the ‘waiting room’ how the file extension of files change as they are standardised. Successfully imbibed files have a ‘ipr’ extension. Post header standardisation the extension changes to ‘iph’; the extension becomes ‘ipi’ when phenomena synonyms are corrected.

# move standardised files to a storage directory
transfer_sts_files(pipe_house)

Archiving raw data files: Before removing raw unstandardised files—if there is a ‘raw_room’ directory in the pipeline working directory—raw input data files will be copied to this directory and filed in folders by year and month of the lasted date of recording. This is done by the imbibe_raw_batch() function.

Notes on the phenomena standards: Apart from the usual ‘rain_cumm’ phenomena where the cumulative total of rainfall ‘tips’ are summed, hobo logger systems document ‘interference’ events. Interference events denote when loggers were ‘interacted’ with in the field, e.g., ‘host connected’ or ‘attached’ for when a download cable is attached to the logger. This information will be useful for processing the data later on.

Make station files

The append_station_batch() function updates station files in the ‘ipip_room’ with files from the ‘nomvet_room’. append_station_batch() optimises ‘non-NA’ data retention between overlapping ‘date-time’ stamps. An important consideration for Hobo-data logger exports that can differ depending on the computing system environment.

## |============== Updating 0 stations with 0 standarised files ... ==============|
## |=============================  stations appended  ============================|

Now that station files have been generated lets examine what has been done so far. Station files are maintained in the ‘ipip_room’ of the pipeline’s folder structure.

# list station files in the ipip directory
sflist <- dta_list(
  input_dir = pipe_house$ipip_room, # search directory
  file_ext = ".ipip", # note the station's default file extension
)

# check what stations are in the ipip room
print(sflist)
## [1] "mcp_coastal_cashews_rn.ipip" "mcp_mabasa_tc_rn.ipip"       "mcp_manz_office_rn.ipip"     "mcp_sileza_camp_rn.ipip"
# read in the station file
sf <- readRDS(file.path(pipe_house$ipip_room, sflist[1]))

# names of the tables stored in the station file
names(sf)
##  [1] "data_summary"       "dt_1_days_agg"      "dt_1_days_agg_saws" "dt_1_hours_agg"     "dt_1_months_agg"    "dt_1_years_agg"     "dt_5_mins_agg"      "dt_pseudo_event"    "dt_rain"            "dt_rain_se"        
## [11] "f_params"           "gaps"               "logg_interfere"     "meta_events"        "phen_data_summary"  "phens"              "phens_dt"           "pipe_seq"           "pseudo_events"      "raw_rain"

‘Raw’ data

Our station file has one ‘raw’ data table, the ‘raw_rain’ table, composed of the event data below.

# a look at the first few data rows
head(sf$raw_rain)
##       id           date_time attached connected detached end_of_file rain_cumm stopped
##    <num>              <POSc>   <fctr>    <fctr>   <fctr>      <fctr>     <num>  <fctr>
## 1:     1 2019-01-14 16:17:19                                             0.000        
## 2:     2 2019-01-14 16:18:30                      Logged                    NA        
## 3:     3 2019-01-16 06:23:18                                             0.254        
## 4:     4 2019-01-16 06:28:15                                             0.508        
## 5:     5 2019-01-16 06:36:08                                             0.762        
## 6:     6 2019-01-16 11:24:44                                             1.016

The ‘raw’ data-header summary

This table contains summary information on the origin of each data file used to make up the station file.
dsid file_format uz_station location station stnd_title start_dttm end_dttm logger_type logger_title logger_sn logger_os logger_program_name logger_program_sig uz_record_interval_type uz_record_interval record_interval_type record_interval dttm_inc_exc dttm_ie_chng uz_table_name table_name nomvet_name file_origin
1 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2019-01-14 16:17:19 2019-03-13 14:07:53 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20190114-20190313__1.ipi NA
2 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2019-01-14 16:17:19 2019-04-11 13:50:38 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20190114-20190411__1.ipi NA
3 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2019-01-14 16:17:19 2019-05-15 09:27:29 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20190114-20190515__1.ipi NA
4 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2019-01-14 16:17:19 2019-07-09 09:03:39 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20190114-20190709__1.ipi NA
5 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2019-07-09 10:05:04 2019-08-13 15:21:31 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20190709-20190813__1.ipi NA
6 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2019-08-13 15:33:12 2019-09-11 11:43:00 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20190813-20190911__1.ipi NA
7 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2019-08-13 15:33:12 2019-10-09 08:53:19 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20190813-20191009__1.ipi NA
8 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2019-08-13 15:33:12 2019-11-13 09:23:40 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20190813-20191113__1.ipi NA
9 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2019-08-13 15:33:12 2019-12-12 09:57:13 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20190813-20191212__1.ipi NA
10 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2020-01-15 10:29:41 2020-02-14 12:21:12 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20200115-20200214__1.ipi NA
11 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2020-01-15 10:29:41 2020-03-09 15:58:07 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20200115-20200309__1.ipi NA
12 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2020-03-09 16:59:58 2020-07-09 10:04:38 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20200309-20200709__1.ipi NA
13 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2020-03-09 16:59:58 2020-08-14 13:33:18 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20200309-20200814__1.ipi NA
14 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2020-03-09 16:59:58 2020-09-11 13:30:58 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20200309-20200911__1.ipi NA
15 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2020-03-09 16:59:58 2020-10-15 10:55:22 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20200309-20201015__1.ipi NA
16 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2020-03-09 16:59:58 2020-11-19 13:37:47 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20200309-20201119__1.ipi NA
17 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2020-03-09 16:59:58 2020-12-09 14:23:29 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20200309-20201209__1.ipi NA
18 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2020-03-09 16:59:58 2021-02-11 10:23:06 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20200309-20210211__1.ipi NA
19 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-02-11 11:12:10 2021-03-09 11:06:52 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20210211-20210309__1.ipi NA
20 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-02-11 11:12:10 2021-04-13 12:36:46 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20210211-20210413__1.ipi NA
21 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-02-11 11:12:10 2021-05-14 15:30:17 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20210211-20210514__1.ipi NA
22 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-02-11 11:12:10 2021-06-17 10:07:51 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20210211-20210617__1.ipi NA
23 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-02-11 11:12:10 2021-07-20 11:39:31 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20210211-20210720__1.ipi NA
24 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-02-11 11:12:10 2021-08-12 09:26:17 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20210211-20210812__1.ipi NA
25 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-02-11 11:12:10 2021-09-15 15:27:21 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20210211-20210915__1.ipi NA
26 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-02-11 11:12:10 2021-10-14 11:13:36 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20210211-20211014__1.ipi NA
27 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-11-11 11:45:32 2021-12-14 14:47:35 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20211111-20211214__1.ipi NA
28 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-11-11 11:45:32 2022-01-13 13:18:25 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20211111-20220113__1.ipi NA
29 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-11-11 11:45:32 2022-02-10 07:54:55 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20211111-20220210__1.ipi NA
30 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-11-11 11:45:32 2022-03-10 17:19:55 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20211111-20220310__1.ipi NA
31 hobo_csv_export Plot_Title_Coastal_cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2021-11-11 11:45:32 2022-05-17 14:40:56 hobo_pendant NA hobo_data NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20211111-20220517__1.ipi NA
32 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2022-05-17 14:56:22 2022-07-13 10:43:59 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20220517-20220713__1.ipi NA
33 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2022-05-17 14:56:22 2022-08-10 12:37:41 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20220517-20220810__1.ipi NA
34 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2022-05-17 14:56:22 2022-09-15 17:37:15 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20220517-20220915__1.ipi NA
35 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2022-05-17 14:56:22 2022-10-13 14:25:31 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20220517-20221013__1.ipi NA
36 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2022-05-17 14:56:22 2022-11-07 13:50:14 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20220517-20221107__1.ipi NA
37 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2022-05-17 14:56:22 2022-12-13 15:41:24 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20220517-20221213__1.ipi NA
38 hobo_csv_export Plot_Title_Coastal_Cashews mcp coastal_cashews_rn mcp_coastal_cashews_rn 2022-12-13 16:29:28 2023-01-11 17:03:29 hobo_pendant NA 20116788 NA NA NA event_based discnt event_based discnt TRUE FALSE hobo_data raw_rain mcp_coastal_cashews_rn_discnt_20221213-20230111__1.ipi NA

The phenomena table: ‘phens’

A station file version of phenomena standards. Note each phenomena variation/synonym has a unique identifier (‘phid’) within the scope of this station.
phid phen_name_full phen_type phen_name units measure offset var_type uz_phen_name uz_units uz_measure f_convert sensor_id notes table_name
1 Interference event: coupler attached Interference event attached event event NA fac Coupler Attached (LGR S/N: 20116788) no_spec no_spec NA NA NA raw_rain
2 Interference event: coupler detached Interference event detached event event NA fac Coupler Detached (LGR S/N: 20116788) no_spec no_spec NA NA NA raw_rain
3 Interference event: end of file Interference event end_of_file event event NA fac End Of File (LGR S/N: 20116788) no_spec no_spec NA NA NA raw_rain
4 Interference event: host connected Interference event connected event event NA fac Host Connected (LGR S/N: 20116788) no_spec no_spec NA NA NA raw_rain
11 Interference event: stopped Interference event stopped event event NA fac Stopped (LGR S/N: 20116788) no_spec no_spec NA NA NA raw_rain
5 Precipitation: total rainfall Precipitation rain_cumm mm cumm 0 num Rainfall, mm (LGR S/N: 20116788, SEN S/N: 20116788) no_spec no_spec NA NA NA raw_rain
Note the ‘phid’ link in the temporal phenomena summary below.
phid phen_name start_dttm end_dttm table_name
1 attached 2019-01-14 16:17:19 2023-01-11 17:03:29 raw_rain
4 connected 2019-01-14 16:17:19 2023-01-11 17:03:29 raw_rain
2 detached 2019-01-14 16:17:19 2023-01-11 17:03:29 raw_rain
3 end_of_file 2019-01-14 16:17:19 2023-01-11 17:03:29 raw_rain
5 rain_cumm 2019-01-14 16:17:19 2023-01-11 17:03:29 raw_rain
11 stopped 2019-01-14 16:17:19 2023-01-11 17:03:29 raw_rain

Phenomena append notes: When appending phenomena tables, if there is overlapping data, ‘ipayipi’ will examine each overlapping phenomena series in turn and overwrite either new data (e.g., additions to a station file) or the station file data. The phenomena data summary keeps a temporal record of how phenomena have been appended. Maintaining these records helps data processing down the pipeline.

Handling big data: In order to minimise memory usage when processing station files they are ‘decompressed’ into temporary station files that by default last the duration of an R session. The temporary files are indexed/chunked/read by date-time. While this reduces memory allocation for further processing there is an initial time cost to chunk data when starting to use a station in an R session using this default setting.

Integrating field-event data

Event metadata collected in the field, involving checks on the physical rain gauges, is important to integrate into the data pipeline. Below we import event metadata from a database of field records and save in the pipeline’s data processing directory.

## update station metadata ----
rl <- googlesheets4::read_sheet(
  ss = "1b-dorAHYT21xa8vWS8tPu7M9NIkc9lrt2yYPVEnxX_o",
  sheet = "mcp_event_data_generic",
  range = "A1:Q341",
  col_names = TRUE,
  col_types = c("ilTccccdddcllcTTc"),
  na = ""
)
names(rl)[3] <- "date_time"
meta_read(meta_file = rl, col_types = "ilTccccdddcllcTTc",
  output_dir = pipe_house$ipip_room, output_name = "event_db"
)

Event metadata then gets pushed to appropriate stations using the function below.

meta_to_station(pipe_house = pipe_house, meta_file = "event_db")

Missing data—gaps

Checking for data ‘gaps’ in continuous data streams is straight forward—just highlight the missing/NA values. But with discontinuous or event-based data it is more nuanced. gap_eval_batch() identifies gap periods, specifically where a logger was not recording, in continuous and discontinuous time-series data. For discontinuous data streams, if the duration between the logger-host connection (download in the field) and next start of logging (derived from the interference events above), exceeds a threshold, a gap is present. The default threshold is 10 minutes — see the table below.

gap_eval_batch(pipe_house)
sf <- readRDS(file.path(pipe_house$ipip_room, sflist[4]))[["gaps"]]
print(sf)
##      gid    eid gap_type   phen table_name           gap_start             gap_end    dt_diff_s gap_problem_thresh_s problem_gap  notes
##    <int> <lgcl>   <char> <char>     <char>              <POSc>              <POSc>   <difftime>                <num>      <lgcl> <char>
## 1:     1     NA     auto logger   raw_rain 2021-03-30 14:21:10 2021-03-31 11:23:15   75725 secs                21600        TRUE   <NA>
## 2:     2     NA     auto logger   raw_rain 2023-06-09 09:59:03 2023-08-10 09:02:45 5353422 secs                21600        TRUE   <NA>
## 3:     3     NA     auto logger   raw_rain 2024-04-17 08:32:36 2024-05-07 10:50:46 1736290 secs                21600        TRUE   <NA>

The graph below shows only the Sileza rainfall station with a data gap that exceeded the 10 minute threshold. Hover over the graph—interact.

p <- dta_availability(pipe_house = pipe_house, plot_tbls = "raw_rain")
ggplotly(p$plt, tooltip = "text")

Gaps highlighted in the dark-red colour.

The gap_eval_batch() function has detailed help files on how to structure event metadata. This event metadata gets used when ‘gaps’ are defined. These gaps will need to be imputed with appropriate data in further processing steps not covered here.

Process data

Next data is processed using a custom ‘pipe sequence’. The ‘pipe sequence’ details processing operations stage by stage and step by step. The pipe sequence object pipe_seq is read into the R environment by sourcing its script as below. This script serves as a reference for processing data. More on the pipe sequence at the end of this document.

source('ipayipi_vignettes/rainfall_eg/r/pipe_seq/txs_tb.r')
dt_process_batch(pipe_house = pipe_house, pipe_seq = pipe_seq)

Note the ‘dt_’ preffix of processed data tables, and the embedded pipe_seq table in the updated station file below. There is also a table of ‘pseudo events’ or false tips.

# read in the station file
sf <- readRDS(file.path(pipe_house$ipip_room, sflist[1]))
# names of the tables stored in the station file
names(sf)
##  [1] "data_summary"       "dt_1_days_agg"      "dt_1_days_agg_saws" "dt_1_hours_agg"     "dt_1_months_agg"    "dt_1_years_agg"     "dt_5_mins_agg"      "dt_pseudo_event"    "dt_rain"            "dt_rain_se"        
## [11] "f_params"           "gaps"               "logg_interfere"     "meta_events"        "phen_data_summary"  "phens"              "phens_dt"           "pipe_seq"           "pseudo_events"      "raw_rain"

Inspect data

Visualising the cumulative sum of phenomena is useful for examining rate of change and correlation between datasets.

Query data

Station data can be queried across or within pipelines using dta_pull_flat(). Data can be saved to file in ‘csv’ format — see below.

# wide format
# when writing to csv the default output directory is pipe_house$dta_out
dta_flat_pull(pipe_house = pipe_house,
  phen_name = "rain_tot", # name of the phenomena --- exact match
  tab_names = "dt_1_years_agg", # table within which to search for phen
  out_csv = TRUE,
  out_csv_preffix = "mcp"
)
## $dta
##     date_time mcp_coastal_cashews_rn mcp_mabasa_tc_rn mcp_manz_office_rn mcp_sileza_camp_rn
##        <POSc>                  <num>            <num>              <num>              <num>
## 1: 2018-01-01                     NA               NA            672.338             69.088
## 2: 2019-01-01                584.200               NA            976.376            584.454
## 3: 2020-01-01                606.298               NA           1700.276            523.494
## 4: 2021-01-01                768.604          237.490           2021.840            724.916
## 5: 2022-01-01                834.644          635.762           2328.672            654.558
## 6: 2023-01-01                  0.508               NA           2203.704            704.850
## 7: 2024-01-01                     NA               NA           1314.704            246.380
## 
## $time_interval
## dt_1_years_agg 
##      "1_years" 
## 
## $stations
## [1] "mcp_coastal_cashews_rn.ipip" "mcp_mabasa_tc_rn.ipip"       "mcp_manz_office_rn.ipip"     "mcp_sileza_camp_rn.ipip"    
## 
## $tab_names
## [1] "dt_1_years_agg"
## 
## $phen_name
## [1] "rain_tot"
## 
## $file_ext
## [1] ".ipip"
# long format
d <- dta_flat_pull(pipe_house = pipe_house, phen_name = "rain_tot",
  tab_names = "dt_1_years_agg", wide = FALSE
)
d$dta
##      date_time             stnd_title rain_tot
##         <POSc>                 <fctr>    <num>
##  1: 2018-01-01 mcp_coastal_cashews_rn       NA
##  2: 2019-01-01 mcp_coastal_cashews_rn  584.200
##  3: 2020-01-01 mcp_coastal_cashews_rn  606.298
##  4: 2021-01-01 mcp_coastal_cashews_rn  768.604
##  5: 2022-01-01 mcp_coastal_cashews_rn  834.644
##  6: 2023-01-01 mcp_coastal_cashews_rn    0.508
##  7: 2024-01-01 mcp_coastal_cashews_rn       NA
##  8: 2018-01-01       mcp_mabasa_tc_rn       NA
##  9: 2019-01-01       mcp_mabasa_tc_rn       NA
## 10: 2020-01-01       mcp_mabasa_tc_rn       NA
## 11: 2021-01-01       mcp_mabasa_tc_rn  237.490
## 12: 2022-01-01       mcp_mabasa_tc_rn  635.762
## 13: 2023-01-01       mcp_mabasa_tc_rn       NA
## 14: 2024-01-01       mcp_mabasa_tc_rn       NA
## 15: 2018-01-01     mcp_manz_office_rn  672.338
## 16: 2019-01-01     mcp_manz_office_rn  976.376
## 17: 2020-01-01     mcp_manz_office_rn 1700.276
## 18: 2021-01-01     mcp_manz_office_rn 2021.840
## 19: 2022-01-01     mcp_manz_office_rn 2328.672
## 20: 2023-01-01     mcp_manz_office_rn 2203.704
## 21: 2024-01-01     mcp_manz_office_rn 1314.704
## 22: 2018-01-01     mcp_sileza_camp_rn   69.088
## 23: 2019-01-01     mcp_sileza_camp_rn  584.454
## 24: 2020-01-01     mcp_sileza_camp_rn  523.494
## 25: 2021-01-01     mcp_sileza_camp_rn  724.916
## 26: 2022-01-01     mcp_sileza_camp_rn  654.558
## 27: 2023-01-01     mcp_sileza_camp_rn  704.850
## 28: 2024-01-01     mcp_sileza_camp_rn  246.380
##      date_time             stnd_title rain_tot

Visualise monthly anomalies

p <- plot_m_anomaly(pipe_house = pipe_house, phen_name = "rain_tot",
  phen_units = "mm"
)
p$plt[[3]]

p$plt[[4]]
## Warning: Removed 8 rows containing missing values or values outside the scale range (`geom_segment()`).

The rainfall ‘pipe_seq’

It’s not necessary to understand the detail of each function that builds the pipe_seq object below. However, for cusomising these functions, more info can be found in each function’s helpfile descriptions, e.g., type ?pipe_seq in your R console. The pipe_seq table from the station file processed above is shown below. Note function parameters (‘f_params’) are described for each stage (‘dt_n’) and nested steps (‘dtp_n’). The function for each stage’s step are in the ‘f’ column. This pipe sequence table makes it easier to inspect the data processing pipeline. When this table was evaluated by dt_drocess_batch() further parameters were produced for each respective function and stored in the station file objects.

# using kableExtra
kbl(sf$pipe_seq) |> kable_paper("hover") |> kable_styling(font_size = 11) |>
  scroll_box(width = "100%", height = "180px")
dt_n dtp_n n f f_params input_dt output_dt time_interval start_dttm end_dttm
1 1 1 dt_harvest hsf_param_eval(hsf_table = “meta_events”) meta_events dt_pseudo_event discnt NA NA
1 2 1 dt_calc dt[event_type == ‘pseudo_events’, ] dt_pseudo_event dt_pseudo_event discnt NA NA
1 2 2 dt_calc [stnd_title == station, ] dt_pseudo_event dt_pseudo_event discnt NA NA
1 2 3 dt_calc [qa == TRUE, ] dt_pseudo_event dt_pseudo_event discnt NA NA
2 1 1 dt_harvest hsf_param_eval(hsf_table = “raw_rain”) raw_rain dt_rain_se discnt NA NA
2 2 1 dt_calc dt[, fktest := fifelse(.I == 1 | .I == .N, TRUE, FALSE)] dt_rain_se dt_rain_se discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
2 2 2 dt_calc [fktest == TRUE, ] dt_rain_se dt_rain_se discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
2 2 3 dt_calc [, rain_mm := rain_cumm, ipip(measure = “tot”, units = “mm”, var_type = “num”)] dt_rain_se dt_rain_se discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
2 2 4 dt_calc [, rain_mm := 0] dt_rain_se dt_rain_se discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
2 2 5 dt_calc [, .(date_time, rain_mm)] dt_rain_se dt_rain_se discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
3 1 1 dt_harvest hsf_param_eval(hsf_table = “dt_rain_se”) dt_rain_se dt_rain_se discnt NA NA
3 2 1 dt_calc dt[, fktest := fifelse(.I == 1 | .I == .N, TRUE, FALSE)] dt_rain_se dt_rain_se discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
3 2 2 dt_calc [fktest == TRUE, ] dt_rain_se dt_rain_se discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
3 2 3 dt_calc [, rain_mm := 0] dt_rain_se dt_rain_se discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
3 2 4 dt_calc [, .(date_time, rain_mm)] dt_rain_se dt_rain_se discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
4 1 1 dt_harvest hsf_param_eval(hsf_table = “raw_rain”) raw_rain dt_rain discnt NA NA
4 2 1 dt_calc dt[!is.na(rain_cumm), ] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 2 2 dt_calc [, rain_diff := c(0, rain_cumm[2:.N] - rain_cumm[1:(.N - 1)]), ipip(measure = “tot”, units = “mm”, var_type = “num”)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 2 3 dt_calc [rain_diff != 0, ] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 2 4 dt_calc [, t_lag := c(0, date_time[2:.N] - date_time[1:(.N - 1)]), ipip(measure = “tot”, units = “sec”, var_type = “num”)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 3 1 dt_harvest hsf_param_eval(hsf_table = “logg_interfere”) logg_interfere dt_rain discnt NA NA
4 4 1 dt_join join_param_eval(fuzzy = c(0, 600), join = “left_join”) dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 5 1 dt_harvest hsf_param_eval(hsf_table = “dt_pseudo_event”) dt_pseudo_event dt_rain discnt NA NA
4 6 1 dt_join join_param_eval(fuzzy = 0, join = “left_join”, y_key = c(“start_dttm”, “end_dttm”)) dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 1 dt_calc dt[, false_tip_type := fifelse(logg_interfere_type %in% ‘on_site’, ‘interfere’, NA_character_), ipip(measure = “smp”, units = “false_tip”, var_type = “chr”)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 2 dt_calc [, false_tip_type := fifelse(is.na(false_tip_type) & event_type %in% ‘pseudo_events’,‘pseudo_event’, false_tip_type)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 3 dt_calc [, false_tip_type := fifelse(t_lag == 1, ‘double_tip’, false_tip_type)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 4 dt_calc [, false_tip := fifelse(!is.na(false_tip_type), TRUE, FALSE), ipip(measure = “smp”, units = “false_tip”, var_type = “logi”)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 5 dt_calc [, false_tip := fifelse(problem_gap %in% FALSE, FALSE, false_tip)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 6 dt_calc [, false_tip := fifelse(is.na(false_tip), FALSE, false_tip)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 7 dt_calc [, .(date_time, rain_cumm, rain_diff, false_tip, false_tip_type, problem_gap)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 8 dt_calc [false_tip == FALSE, .(date_time, false_tip, false_tip_type, problem_gap), ipip(fork_table = “pseudo_events”)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 9 dt_calc [, .(date_time)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
4 7 10 dt_calc [, rain_mm := 0.254, ipip(measure = “tot”, units = “mm”, var_type = “num”)] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
5 1 1 dt_harvest hsf_param_eval(hsf_table = “dt_rain”) dt_rain dt_rain discnt NA NA
5 2 1 dt_calc dt[!rain_mm == 0, ] dt_rain dt_rain discnt 2019-01-16 06:23:18 2023-01-08 13:43:52
5 3 1 dt_harvest hsf_param_eval(hsf_table = “dt_rain_se”) dt_rain_se dt_rain discnt NA NA
5 4 1 dt_join join_param_eval(join = “full_join”) dt_rain dt_rain discnt 2019-01-14 16:17:19 2023-01-11 17:03:29
6 1 1 dt_harvest hsf_param_eval(hsf_table = “gaps”) gaps dt_gaps_tmp discnt NA NA
6 2 1 dt_calc dt[problem_gap == TRUE, .(gap_start, gap_end, problem_gap)] dt_gaps_tmp dt_gaps_tmp discnt NA NA
7 1 1 dt_harvest hsf_param_eval(hsf_table = “dt_rain”) dt_rain dt_rain discnt NA NA
7 2 1 dt_agg agg_options(agg_intervals=c(“5_mins”,“1_hours”,“1_days”)) dt_rain dt_rain discnt NA NA
7 2 2 dt_agg agg_options(ignore_nas=TRUE) dt_rain dt_rain discnt NA NA
7 2 3 dt_agg agg_options(all_phens=FALSE) dt_rain dt_rain discnt NA NA
7 2 4 dt_agg aggs(rain_mm=.(phen_out_name=“rain_tot”,units=“mm”)) dt_rain dt_rain discnt NA NA
7 3 1 dt_agg agg_options(agg_intervals=“1_days”) dt_rain dt_rain discnt NA NA
7 3 2 dt_agg agg_options(agg_dt_suffix=“_agg_saws”) dt_rain dt_rain discnt NA NA
7 3 3 dt_agg agg_options(ignore_nas=TRUE) dt_rain dt_rain discnt NA NA
7 3 4 dt_agg agg_options(all_phens=FALSE) dt_rain dt_rain discnt NA NA
7 3 5 dt_agg agg_options(agg_offset=“8hours”) dt_rain dt_rain discnt NA NA
7 3 6 dt_agg aggs(rain_mm=.(phen_out_name=“rain_tot”,units=“mm”)) dt_rain dt_rain discnt NA NA
8 1 1 dt_harvest hsf_param_eval(hsf_table = “dt_1_days_agg”) dt_1_days_agg dt_discnt discnt NA NA
8 2 1 dt_agg agg_options(agg_intervals=c(“1_months”,“1_years”)) dt_discnt dt_discnt discnt NA NA
8 2 2 dt_agg agg_options(ignore_nas=TRUE) dt_discnt dt_discnt discnt NA NA

The rainfall-data pipeline was build using the following script. This pipe sequence can be adjusted and re-embedded into a station file at any time.

library(ipayipi)
#' Pipeline rainfall data processing sequence
#' v1.00
#' Last updated 2024-07-30
#'
#' Summary
#' The pipe sequence is documented as `pipe_seq` below. First calculation
#'  parameters are defined as consequtively numbered 'x' variables. These
#'  are used in the `pipe_seq` object.
#' This `pipe_seq` cleans/processes rainfall data from automatic tipping-
#' bucket rainguages. Data processed into the folling time
#' interval aggregations; "5 mins", "hourly", "daily", "monthly", and "yearly".
#' For compatibility with SAWS an eight hour offest is provided in addition to
#' the standard daily aggregation.

# define constants ----
#' general tipping bucket pipeline sequence for rainfall data
rain_tip <- 0.254 #' value in mm of a single tip-bucket tip

# calc parameters -> passed to `calc_param_eval()` ----
#' filter the events data based to extract 'pseudo events' (i.e., false tips)
#' Pseudo events
x12 <- list(
  pseudo_ev1 = chainer(dt_syn_bc = "event_type == \'pseudo_events\'"),
  pseudo_ev2 = chainer(dt_syn_bc = "stnd_title == station"),
  pseudo_ev3 = chainer(dt_syn_bc = "qa == TRUE")
)
#' create a start and end date_time -- necessary for discontinuous data
#' this will be appended to the discontinuous data series before data
#' aggregation
x22 <- list(
  fork_se1 = chainer(
    dt_syn_ac = "fktest := fifelse(.I == 1 | .I == .N, TRUE, FALSE)"
  ),
  fork_se2 = chainer(dt_syn_bc = "fktest == TRUE"),
  fork_se3 = chainer(dt_syn_ac = "rain_mm := rain_cumm",
    measure = "tot", units = "mm", var_type = "num"
  ),
  fork_se4 = chainer(dt_syn_ac = "rain_mm := 0"),
  fork_se5 = chainer(dt_syn_ac = ".(date_time, rain_mm)")
)
#' repeat the above after it was written to dt_rain_se in chunked data
x32 <- list(
  fork_se1 = chainer(
    dt_syn_ac = "fktest := fifelse(.I == 1 | .I == .N, TRUE, FALSE)"
  ),
  fork_se2 = chainer(dt_syn_bc = "fktest == TRUE"),
  fork_se4 = chainer(dt_syn_ac = "rain_mm := 0"),
  fork_se5 = chainer(dt_syn_ac = ".(date_time, rain_mm)")
)
#' remove double tips (tips one second after the previous)
x42 <- list(
  logg_remove1 = chainer(dt_syn_bc = "!is.na(rain_cumm)"),
  rain_diff = chainer(
    dt_syn_ac = "rain_diff := c(0, rain_cumm[2:.N] - rain_cumm[1:(.N - 1)])",
    measure = "tot", units = "mm", var_type = "num"
  ),
  rain_diff_remove = chainer(dt_syn_bc = "rain_diff != 0"),
  t_lag = chainer(
    dt_syn_ac = "t_lag := c(0, date_time[2:.N] - date_time[1:(.N - 1)])",
    measure = "tot", units = "sec", var_type = "num"
  )
)
#' summarise pseudo events--false tips
x47 <- list(
  false_tip_type1 = chainer(dt_syn_ac = paste0("false_tip_type := fifelse(",
      "logg_interfere_type %in% \'on_site\', \'interfere\', NA_character_)"
    ), temp_var = FALSE, measure = "smp", units = "false_tip", var_type = "chr"
  ),
  false_tip_type2 = chainer(dt_syn_ac = paste0("false_tip_type := fifelse(",
    "is.na(false_tip_type) & event_type %in% \'pseudo_events\',",
    "\'pseudo_event\', false_tip_type)"
  )),
  false_tip_type3 = chainer(dt_syn_ac = paste0("false_tip_type := ",
    "fifelse(t_lag == 1, \'double_tip\', false_tip_type)"
  )),
  false_tip4 = chainer(
    dt_syn_ac = "false_tip := fifelse(!is.na(false_tip_type), TRUE, FALSE)",
    temp_var = FALSE, measure = "smp", units = "false_tip", var_type = "logi"
  ),
  false_tip5 = chainer(dt_syn_ac =
      "false_tip := fifelse(problem_gap %in% FALSE, FALSE, false_tip)"
  ),
  false_tip6 = chainer(
    dt_syn_ac = "false_tip := fifelse(is.na(false_tip), FALSE, false_tip)"
  ),
  clean_up1 = chainer(dt_syn_ac = paste0(".(date_time, rain_cumm, rain_diff, ",
    "false_tip, false_tip_type, problem_gap)"
  )),
  false_tip_table = chainer(dt_syn_bc = "false_tip == FALSE",
    dt_syn_ac = ".(date_time, false_tip, false_tip_type, problem_gap)",
    fork_table = "pseudo_events"
  ),
  clean_up2 = chainer(dt_syn_ac = paste0(".(date_time)")),
  rain_mm = chainer(dt_syn_ac = paste0("rain_mm := ", rain_tip),
    measure = "tot", units = "mm", var_type = "num"
  )
)
#' remove old start end date-time values
x52 <- list(clean_se = chainer(dt_syn_bc = "!rain_mm == 0"))
#' filter gaps before joining --- only want 'problem gaps'
x62 <- list(pgap = chainer(dt_syn_bc = "problem_gap == TRUE",
  dt_syn_ac = ".(gap_start, gap_end, problem_gap)"
))

# build pipe sequence ----
#' this builds the table from the described parameters that will be evaluated
#' to generate parameters for processing the data
pipe_seq <- pipe_seq(p = pdt(
  # extract pseudo events
  p_step(dt_n = 1, dtp_n = 1, f = "dt_harvest",
    f_params = hsf_param_eval(hsf_table = "meta_events"),
    output_dt = "dt_pseudo_event"
  ),
  p_step(dt_n = 1, dtp_n = 2, f = "dt_calc",
    f_params = calc_param_eval(x12),
    output_dt = "dt_pseudo_event"
  ),
  #' create 'fork_se'
  #' the fork start and end date-time will be appended to data before
  #' time interval aggregations---NB for event data
  #' this feeds the aggregation function the full range of data to
  #' be aggregated. Without this the leading and training time where events
  #' aren't logged will be cut short from the aggregation
  p_step(dt_n = 2, dtp_n = 1, f = "dt_harvest",
    f_params = hsf_param_eval(hsf_table = "raw_rain"),
    output_dt = "dt_rain_se"
  ),
  p_step(dt_n = 2, dtp_n = 2, f = "dt_calc",
    f_params = calc_param_eval(x22),
    output_dt = "dt_rain_se"
  ),
  # join 'fork_se' with old 'fork_se'
  p_step(dt_n = 3, dtp_n = 1, f = "dt_harvest",
    f_params = hsf_param_eval(hsf_table = "dt_rain_se"),
    output_dt = "dt_rain_se"
  ),
  p_step(dt_n = 3, dtp_n = 2, f = "dt_calc",
    f_params = calc_param_eval(x32),
    output_dt = "dt_rain_se"
  ),
  # find 'double tips'
  p_step(dt_n = 4, dtp_n = 1, f = "dt_harvest",
    f_params = hsf_param_eval(hsf_table = "raw_rain"),
    output_dt = "dt_rain"
  ),
  p_step(dt_n = 4, dtp_n = 2, f = "dt_calc",
    f_params = calc_param_eval(x42),
    output_dt = "dt_rain"
  ),
  # get interference events
  p_step(dt_n = 4, dtp_n = 3, f = "dt_harvest",
    f_params = hsf_param_eval(hsf_table = "logg_interfere"),
    output_dt = "dt_rain"
  ),
  p_step(dt_n = 4, dtp_n = 4, f = "dt_join",
    f_params = join_param_eval(join = "left_join", fuzzy = c(0, 600))
  ),
  # add pseudo events
  p_step(dt_n = 4, dtp_n = 5, f = "dt_harvest",
    f_params = hsf_param_eval(hsf_table = "dt_pseudo_event"),
    output_dt = "dt_rain"
  ),
  p_step(dt_n = 4, dtp_n = 6, f = "dt_join",
    f_params = join_param_eval(join = "left_join", fuzzy = 0,
      y_key = c("start_dttm", "end_dttm")
    ), output_dt = "dt_rain"
  ),
  p_step(dt_n = 4, dtp_n = 7, f = "dt_calc",
    f_params = calc_param_eval(x47),
    output_dt = "dt_rain"
  ),
  p_step(dt_n = 5, dtp_n = 1, f = "dt_harvest",
    f_params = hsf_param_eval(hsf_table = "dt_rain"),
    output_dt = "dt_rain"
  ),
  p_step(dt_n = 5, dtp_n = 2, f = "dt_calc",
    f_params = calc_param_eval(x52),
    output_dt = "dt_rain"
  ),
  p_step(dt_n = 5, dtp_n = 3, f = "dt_harvest",
    f_params = hsf_param_eval(hsf_table = "dt_rain_se"),
    output_dt = "dt_rain"
  ),
  p_step(dt_n = 5, dtp_n = 4, f = "dt_join",
    join_param_eval(), output_dt = "dt_rain"
  ),
  p_step(dt_n = 6, dtp_n = 1, f = "dt_harvest",
    hsf_param_eval(hsf_table = "gaps"),
    output_dt = "dt_gaps_tmp"
  ),
  p_step(dt_n = 6, dtp_n = 2, f = "dt_calc",
    calc_param_eval(x62), output_dt = "dt_gaps_tmp"
  ),
  # aggregate data
  p_step(dt_n = 7, dtp_n = 1, f = "dt_harvest",
    hsf_param_eval(hsf_table = "dt_rain"),
    output_dt = "dt_rain"
  ),
  p_step(dt_n = 7, dtp_n = 2, f = "dt_agg",
    agg_param_eval(
      agg_intervals = c("5 mins", "hourly", "daily"),
      ignore_nas = TRUE, all_phens = FALSE,
      agg_parameters = aggs(
        rain_mm = agg_params(units = "mm", phen_out_name = "rain_tot")
      )
    )
  ),
  # saws daily data aggregation
  p_step(dt_n = 7, dtp_n = 3, f = "dt_agg",
    agg_param_eval(
      agg_offset = "8 hours",
      agg_intervals = c("daily"),
      ignore_nas = TRUE,
      all_phens = FALSE,
      agg_parameters = aggs(
        rain_mm = agg_params(units = "mm", phen_out_name = "rain_tot")
      ),
      agg_dt_suffix = "_agg_saws"
    )
  ),
  p_step(dt_n = 8, dtp_n = 1, f = "dt_harvest",
    hsf_param_eval(hsf_table = "dt_1_days_agg")
  ),
  p_step(dt_n = 8, dtp_n = 2, f = "dt_agg",
    agg_param_eval(
      agg_intervals = c("monthly", "yearly"),
      ignore_nas = TRUE
    )
  )
))

When running the pipe sequence we choose what portion of it to run by limiting the stages argument in dt_process_batch()—we can’t run the latter stages though without the former. This allows interruption of processing for imputing data etc.

Tips for working in ‘ipayipi’